
package com.databus;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.*;

import com.config.ClientCfg;
import com.config.CommandParse;
import com.google.protobuf.ByteString;
import com.google.protobuf.GeneratedMessage;
import com.packet.PackageData;
import com.packet.PackageHeader;
import com.protobuf.MsgExpress;
import com.protobuf.MsgExpress.SimpleSubscription;
import com.protobuf.MsgExpress.UnSubscribeData;
import com.thread.HeartBeatThread;
import com.thread.MsgLoopThread;
import com.thread.MsgReceiveThread;
import com.thread.MsgSendThread;
import com.thread.ServerConnect;
import com.thread.TimeOutThread;
import com.tool.Utilities;

public class DatabusConnector implements MessageCallBack, TimeOutCallBack {

    private static final String Version = "2019-01-17";
    private ClassLoader loader=null;
    public DatabusConnector()
    {
        //this.loader=loader;
        mbServer = false;
        ClassLoader loader=this.getClass().getClassLoader();
        mStatus = EConnectStatus.ConnectStatus_Offline;
        mPackDataSyncQueue = new LinkedBlockingQueue<PackageData>();
        mPackDataAsynQueue = new LinkedBlockingQueue<PackageData>();
        mSyncMsgArray = new SyncMessageArray(SerialNumGenerator.MaxSyncSerialNum);
        mCallBackMap = new ConcurrentHashMap<Integer, MsgCallbackInfo>();

        mWaitHeartBeatRspNum = 0;

        CommandParse.Hashcode("");
    }

    public  boolean  Initialize(ClientCfg cfg)
    {
        if(mIsInit)
            return true;

        mIsInit = true;

        Log.info("Initialize, Version = " + Version);
        mIsConnected = false;
        mClientCfg = cfg;
        mConnectThread = new ServerConnect(this, mClientCfg);

        mExecutorService = Executors.newCachedThreadPool();
        mExecutorService.execute(mConnectThread);

        int nCount = 0;
        while(nCount++ <= 2000)
        {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(mIsConnected)
            {
                Log.info("Init Success!");
                return true;
            }
            else
            {
                if(nCount%10==0)
                    Log.info("Wait to connect ...");
            }
        }
        Log.error("Connect To Server Failed!");
        return false;
    }

    // 闁插﹥鏂侀崙鑺ユ殶
    public void Release()
    {
        Log.info("Release MsgExpress ... ");
        Close();
        for(OnConnectChangedListener listener:setStatusListener)
        {
            listener.onConnectChanged(EConnectStatus.ConnectStatus_Offline);
        };
    }

    public  FutureResponse<Object> SendMessage(Object request)
    {
        if (mStatus != EConnectStatus.ConnectStatus_Online
                && !(request instanceof MsgExpress.LoginInfo)
                && !(request instanceof MsgExpress.ComplexSubscribeData))
        {
            Log.error("Send Message failed, status is offline");
            return null;
        }
        int serNum =  SerialNumGenerator.GetSyncNum();

        FutureResponse<Object> future=mSyncMsgArray.RegistMsg(serNum);
        if(future==null)
        {
            Log.error("Register future message failed,serial="+serNum);
            return null;
        }

        if(!mSendThread.SendMessage(request, serNum, null))
        {
            mSyncMsgArray.UnRegistSyncMsg(serNum);
            return null;
        }
        return future;
    }

    public Object SendMessage(Object request, int milliseconds, Options op)
    {
        if (mStatus != EConnectStatus.ConnectStatus_Online
                && !(request instanceof MsgExpress.Login)
                && !(request instanceof MsgExpress.LoginInfo)
                && !(request instanceof MsgExpress.ComplexSubscribeData))
        {
            Log.error("Send Message failed, status is offline");
            return null;
        }

        int serNum = (null != op && 0 != op.serial) ? op.serial : SerialNumGenerator.GetSyncNum();

        if(!mSyncMsgArray.RegistSyncMsg(serNum))
        {
            return null;
        }

        if(!mSendThread.SendMessage(request, serNum, op))
        {
            mSyncMsgArray.UnRegistSyncMsg(serNum);
            return null;
        }
        Object rsp = mSyncMsgArray.WaitForResponse(serNum, milliseconds);
        if(rsp instanceof MsgExpress.ErrMessage)
        {
            MsgExpress.ErrMessage errMsg = (MsgExpress.ErrMessage)rsp;
            Log.error("rsp msg error, code = " + errMsg.getErrcode() + " msg = " + errMsg.getErrmsg());
            return null;
        }
        else
        {
            return rsp;
        }
    }

    public boolean PostMessage(Object request)
    {
        if(mStatus != EConnectStatus.ConnectStatus_Online)
        {
            Log.error("Post Message failed, status is offline");
            return false;
        }

        return mSendThread.SendMessage(request, SerialNumGenerator.GetAsynNum(), null);
    }

    public boolean PostMessage(int cmd, byte[] buff, Options op)
    {
        if(mStatus != EConnectStatus.ConnectStatus_Online)
        {
            Log.error("Post Message failed, status is offline");
            return false;
        }

        return mSendThread.SendMessage(cmd, buff, op);
    }

    public boolean PostMessage(Object request, PostMsgCallBack cb, Object arg, int milliseconds, Options op)
    {
        if(mStatus != EConnectStatus.ConnectStatus_Online)
        {
            Log.error("Post Message failed, status is offline");
            return false;
        }

        int serNum = (null != op && 0 != op.serial) ? op.serial : SerialNumGenerator.GetAsynNum();
        if(null != cb)
        {
            MsgCallbackInfo info = new MsgCallbackInfo();
            info.cb = cb;
            info.params.req = request;
            info.params.arg = arg;

            mCallBackMap.put(serNum, info);

            mTimeOutThread.AddTimeOut(serNum, milliseconds);
        }

        if(!mSendThread.SendMessage(request, serNum, op))
        {
            mCallBackMap.remove(serNum);
            return false;
        }

        return true;
    }

    // 閸欐垿锟戒礁绨茬粵鏃�绉烽幁锟�
    public boolean Reply(PackageHeader header, Object replyMsg)
    {
        Log.debug("Reply Msg : " + replyMsg.toString());
        return mSendThread.ReplyMessage(header, replyMsg);
    }
    public boolean Reply(PackageHeader header, int errCode, String errMsg)
    {
        Log.debug("Reply error Msg, errCode = " + errCode + " errMsg" + errMsg);
        header.command = 0;
        MsgExpress.ErrMessage.Builder builder = MsgExpress.ErrMessage.newBuilder();
        builder.setErrcode(errCode);
        builder.setErrmsg(errMsg);
        return mSendThread.ReplyMessage(header, builder.build());
    }

    public boolean Publish(Object pubData)
    {
        Log.debug("Publish Data : " + pubData.toString());
        return Publish(pubData,null,null,0);
    }
    
    public boolean Publish(Object msg, PostMsgCallBack cb, Object arg, int milliseconds)
    {
        Log.debug("Publish Data : " + msg.toString());
        Options op = new Options();
        op.type = PackageHeader.MsgType.Publish;
        op.sequence=true;
        return PostMessage(msg, cb, arg, milliseconds, op);
    }

    public boolean Subscribe(int subId, GeneratedMessage data, PostMsgCallBack cb, Object arg, int milliseconds)
    {
        Integer cmd = CommandParse.GetCommand(data.getClass().getName());
        Log.debug("Subscribe Data : " + data.toString());
        SimpleSubscription simpleSub = SimpleSubscription.newBuilder()
                .setSubid(subId).addSubmsg(data.toByteString()).setTopic(cmd)
                .build();
        return PostMessage(simpleSub, cb, arg, milliseconds, null);
    }

    public boolean Subscribe(int subId, List<? extends GeneratedMessage> datas, PostMsgCallBack cb, Object arg, int milliseconds)
    {
        Integer cmd = CommandParse.GetCommand(datas.get(0).getClass().getName());
        Log.debug("Subscribe Data List: " + datas.toString());
        List<ByteString> dataList = new ArrayList<ByteString>(datas.size());
        for (GeneratedMessage data : datas)
        {
            dataList.add(data.toByteString());
        }
        SimpleSubscription simpleSub = SimpleSubscription.newBuilder()
                .setSubid(subId).addAllSubmsg(dataList).setTopic(cmd).build();
        return PostMessage(simpleSub, cb, arg, milliseconds, null);
    }

    public boolean UnSubscribe(int subId, PostMsgCallBack cb, Object arg, int milliseconds)
    {
        return PostMessage(UnSubscribeData.newBuilder().setSubid(subId).build(), cb, arg, milliseconds, null);
    }

    @Override
    public void OnRequest(PackageData packData) {
        // TODO Auto-generated method stub
        if(!mbServer)
        {
            Log.debug("Receive Request, typeName = " + packData.mMsg.getClass().getSimpleName()
                    + " msg = " + packData.mMsg.toString());
        }

        if(packData.mMsg instanceof  MsgExpress.ErrMessage)
        {
            this.Reply(packData.mHeader,packData.mMsg);
            return;
        }
        mWaitHeartBeatRspNum = 0;

        if(provider!=null)
        {
            provider.onRequest(packData);
        };
    }

    @Override
    public void OnResponse(PackageData packData) {
        // TODO Auto-generated method stub
        if(!mbServer)
        {
            Log.debug("Receive Response, typeName = " + packData.mMsg.getClass().getSimpleName()
                    + " msg = " + packData.mMsg.toString());
        }

        mWaitHeartBeatRspNum = 0;

        if(packData.mMsg instanceof MsgExpress.HeartBeatResponse)
        {
            return;
        }

        if(packData.mHeader.serialnum < SerialNumGenerator.MaxSyncSerialNum)
        {
            mSyncMsgArray.CopyMessageAndNotify(packData.mHeader.serialnum, packData.mMsg);
        }
        else
        {
            MsgCallbackInfo info = mCallBackMap.remove(packData.mHeader.serialnum);
            if(null != info)
            {
                if(packData.mMsg instanceof MsgExpress.ErrMessage)
                {
                    info.params.result = ((MsgExpress.ErrMessage)packData.mMsg).getErrcode();
                    info.params.errMsg = ((MsgExpress.ErrMessage)packData.mMsg).getErrmsg();
                }
                else
                {
                    info.params.rspData = packData;
                }
                info.cb.OnResponse(info.params);
            }

        }
    }

    @Override
    final public void OnPublish(PackageData packData) {
        // TODO Auto-generated method stub
        mWaitHeartBeatRspNum = 0;
        for(OnPublishListener listener:setPublishListener)
        {
            listener.onPublish(packData);
        };
    }

    @Override
    final public void OnConnectChange(EConnectStatus status) {
        // TODO Auto-generated method stub
        Log.info("OnConnectChange : " + status);
        if(status == mStatus)
        {
            Log.warn("same status, ignore, status = " + status);
            return;
        }

        synchronized(this)
        {
            if(EConnectStatus.ConnectStatus_Online == status)
            {
                mSendThread = new MsgSendThread(mConnectThread.GetOutputStream(),
                        mClientCfg.mZlibSwitch, mClientCfg.mZlibThreshold, mClientCfg.mSendQueueSize);
                mReceiveThread = new MsgReceiveThread(this, mConnectThread.GetInputStream(), mClientCfg.mBufferSize,
                        mPackDataSyncQueue, mPackDataAsynQueue);
                mHeartBeatThread = new HeartBeatThread(mSendThread, null, mClientCfg.mHeartBeatInterval);
                mTimeOutThread = new TimeOutThread(this);

                if(null == mExecutorService)
                {
                    mExecutorService = Executors.newCachedThreadPool();
                }
                if(null == mReadService)
                {
                    mReadService = Executors.newFixedThreadPool(1);
                }
                if(null == mWriteService)
                {
                    mWriteService = Executors.newFixedThreadPool(1);
                }
                if(null == mSequenceService)
                {
                    mSequenceService = Executors.newFixedThreadPool(1);
                }

                for(int i = 0; i < mClientCfg.mThreadNum; i++)
                {
                    MsgLoopThread msgLoopThread;
                    if (i == 0)
                    {
                        msgLoopThread = new MsgLoopThread(this,
                                mPackDataSyncQueue, mTimeOutThread, !mbServer);
                        msgLoopThread.setClassLoader(loader);
                        mSyncMsgLooperThread = msgLoopThread;
                        mSequenceService.execute(msgLoopThread);
                    }
                    else
                    {
                        msgLoopThread = new MsgLoopThread(this,
                                mPackDataAsynQueue, !mbServer);
                        msgLoopThread.setClassLoader(loader);
                        mExecutorService.execute(msgLoopThread);
                    }

                }

                mWriteService.execute(mSendThread);
                mReadService.execute(mReceiveThread);
                mExecutorService.execute(mTimeOutThread);

                if(!RegServer())
                {
                    try {
                        Thread.sleep(mClientCfg.mBrokenInterval);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                    Close();

                    mExecutorService = Executors.newCachedThreadPool();
                    mExecutorService.execute(mConnectThread);
                    return;
                }
                else
                {
                    mStatus = EConnectStatus.ConnectStatus_Online;

                    RegSubscribe();
                    if(mClientCfg.mHeartBeatSwitch)
                    {
                        mExecutorService.execute(mHeartBeatThread);
                    }
                }
            }
            else if(EConnectStatus.ConnectStatus_Offline == status)
            {
                Close();
                mExecutorService = Executors.newCachedThreadPool();
                mExecutorService.execute(mConnectThread);
            }

            mIsConnected = (EConnectStatus.ConnectStatus_Offline != status);
            for (OnConnectChangedListener listener:setStatusListener) {
                listener.onConnectChanged(status);
            }
        }
    }

    @Override
    public int GetAddr()
    {
        return mSendThread.GetAddr();
    }


//    public boolean RegisterService(Integer serviceid,String servicename,List<String> functionlist,String javapackage)
//    {
//        MsgExpress.RegisterService.Builder builder= MsgExpress.RegisterService.newBuilder();
//        builder.setServiceid(serviceid);
//        builder.setServicename(servicename);
//        builder.setJavapackage(javapackage);
//        builder.addAllFunctions(functionlist);
//        MsgExpress.RegisterService regservice=builder.build();
//        CommandParse.Register(regservice);
//        GeneratedMessage resp=SendMessage(regservice, 10000, null);
//        if(resp!=null)
//            Log.info(resp.toString());
//        boolean ret=(resp!=null && resp.getClass()==MsgExpress.CommonResponse.class);
//        return ret;
//    }

    private boolean RegServer()
    {
        MsgExpress.LoginInfo.Builder serverinfo = MsgExpress.LoginInfo.newBuilder();
        serverinfo.setType(mClientCfg.mAppType);
        serverinfo.setName(mClientCfg.mAppName);
        serverinfo.setGroup(mClientCfg.mAppGroup);
        serverinfo.setUuid(mClientCfg.mUuid);
        serverinfo.setAuth(mClientCfg.mAuth);
        serverinfo.setStarttime(System.currentTimeMillis() / 1000);
        serverinfo.setIp(Utilities.getIpAddress());

        MsgExpress.Login.Builder login=MsgExpress.Login.newBuilder();
        login.setInfo(serverinfo);
        if(provider!=null){
            MsgExpress.ServiceInfo service=provider.getServiceInfo();
            login.addServices(service);
        }
        Object rsp = this.SendMessage(login.build(), mClientCfg.mConnectTimeOut, null);
        if(null == rsp || !(rsp instanceof MsgExpress.LoginResponse))
        {
            Log.error("Reg server, rsp error");
            return false;
        }
        else
        	Log.info("Reg server success");
        MsgExpress.LoginResponse Response = (MsgExpress.LoginResponse)rsp;
        for(MsgExpress.ServiceInfo service:Response.getServicesList())
            CommandParse.Register(service);
        mSendThread.SetAddr(Response.getAddr());

        return true;
    }


    private void RegSubscribe()
    {
        int subid=10000;
        MsgExpress.ComplexSubscribeData.Builder builder = MsgExpress.ComplexSubscribeData.newBuilder();
        for(int i = 0; i < mClientCfg.mSubscribeList.size(); i++)
        {
            String sub = mClientCfg.mSubscribeList.get(i);
            Integer topic = null;
            try {
                topic = Integer.valueOf(sub);
            } catch (Exception e) {
                topic = CommandParse.GetCommand(sub);
            }
            if(topic != null)
            {
                MsgExpress.SubscribeData.Builder dataBuilder = builder.addSubBuilder();
                dataBuilder.setSubid(subid++);
                dataBuilder.setTopic(topic);

            }
            else
            {
                Log.error("Not find class cmd when subscribe, class:" + sub);
            }
        }

        Set<Integer> setTopic=new HashSet<>();
        for(OnPublishListener listener:setPublishListener)
        {
        	List<Integer> list=listener.getTopics();
        	if(list!=null)
        		setTopic.addAll(list);
        };
        for(Integer topic:setTopic)
        {
            if(topic!=null)
            {
                MsgExpress.SubscribeData.Builder dataBuilder = builder.addSubBuilder();
                dataBuilder.setSubid(subid++);
                dataBuilder.setTopic(topic);
            }
        }
        if(builder.getSubCount()<1)
            return ;
        Object rsp = this.SendMessage(builder.build(), mClientCfg.mConnectTimeOut, null);
        if(null == rsp || !(rsp instanceof MsgExpress.CommonResponse))
        {
            Log.error("Subscribe response error");
        }
        else
        {
            if(((MsgExpress.CommonResponse)rsp).getRetcode() == 0)
            {
                Log.info("Subscribe Success!");
            }
            else
            {
                Log.error("Subscribe failed, errorCode = " + ((MsgExpress.CommonResponse)rsp).getRetcode());
            }
        }
    }

    private void Close()
    {
        Log.info("Close");
        mStatus = EConnectStatus.ConnectStatus_Offline;
        mSendThread.AddEmptyMsg();
        if(null != mExecutorService)
        {
            mExecutorService.shutdownNow();
            mExecutorService = null;
        }
        if(null != mReadService)
        {
            mReadService.shutdownNow();
            mReadService = null;
        }
        if(null != mWriteService)
        {
            mWriteService.shutdownNow();
            mWriteService = null;
        }
        if(null != mSequenceService)
        {
            mSequenceService.shutdownNow();
            mSequenceService = null;
        }
        if(null != mPackDataSyncQueue)
        {
            mPackDataSyncQueue.clear();
        }
        if(null != mPackDataAsynQueue)
        {
            mPackDataAsynQueue.clear();
        }
        mCallBackMap.clear();
        mConnectThread.Close();

        mWaitHeartBeatRspNum = 0;
    }


    @Override
    public void OnMessageTimeOut(int serNum) {
        // TODO Auto-generated method stub
        MsgCallbackInfo info = mCallBackMap.remove(serNum);
        if(null != info)
        {
            Log.error("msg time out,serNum = " + serNum);
            info.params.result = 1;
            info.params.errMsg = "Time Out";

            info.cb.OnResponse(info.params);
        }
        if(mSyncMsgLooperThread != null)
        {
            mSyncMsgLooperThread.OnMessageTimeOut(serNum);
        }
    }

    class MsgCallbackInfo
    {
        public PostMsgCallBack cb;
        public PostMsgCallBack.MsgParams params = new PostMsgCallBack.MsgParams();
    }

    public interface ServiceProvider {
        void onRequest( PackageData request);
        MsgExpress.ServiceInfo getServiceInfo();
    }

    public interface OnPublishListener {
        void onPublish( PackageData msg);
        List<Integer> getTopics();
    }

    public interface OnConnectChangedListener {
        void onConnectChanged( EConnectStatus status);
    }

    public void setServiceProvider(ServiceProvider provider)
    {
        this.provider=provider;
    }

    public void addPublishListener(OnPublishListener listener)
    {
        synchronized (setPublishListener) {
            setPublishListener.add(listener);
        }
    }
    public void removePublishListener(OnPublishListener listener)
    {
        synchronized (setPublishListener) {
            setPublishListener.remove(listener);
        }
    }

    public void addConnectChangedListener(OnConnectChangedListener listener)
    {
        synchronized (setStatusListener) {
            setStatusListener.add(listener);
        }
    }
    public void removeConnectChangedListener(OnConnectChangedListener listener)
    {
        synchronized (setStatusListener) {
            setStatusListener.remove(listener);
        }
    }

    private ServiceProvider provider=null;
    private Vector<OnPublishListener> setPublishListener=new Vector<>();
    private Vector<OnConnectChangedListener> setStatusListener=new Vector<>();

    private boolean mbServer;
    private boolean mIsInit;                               // 閺勵垰鎯侀崚婵嗩潗閸栨牞绻�
    private EConnectStatus mStatus;                        // 鏉╃偞甯撮悩鑸碉拷锟�
    private String mCfgPath = null;                        // 闁板秶鐤嗛弬鍥︽鐠侯垰绶�
    private ClientCfg mClientCfg;                          // 闁板秶鐤嗘穱鈩冧紖
    private ServerConnect mConnectThread;                  // 閺堝秴濮熺粩顖濈箾閹恒儳鍤庣粙锟�
    private MsgSendThread mSendThread;                     // 濞戝牊浼呴崣鎴︼拷浣哄殠缁嬶拷
    private MsgReceiveThread mReceiveThread;               // 濞戝牊浼呴幒銉︽暪缁捐法鈻�
    private HeartBeatThread mHeartBeatThread;              // 韫囧啳鐑︾痪璺ㄢ柤
    private TimeOutThread mTimeOutThread;                  // 鐡掑懏妞傚Λ锟藉ù瀣殠缁嬶拷
    private MsgLoopThread mSyncMsgLooperThread;            // 閸氬本顒為梼鐔峰灙缁捐法鈻�
    private BlockingQueue<PackageData> mPackDataSyncQueue; // 濞戝牊浼呴幒銉︽暪閺冭泛绨梼鐔峰灙
    private BlockingQueue<PackageData> mPackDataAsynQueue; // 濞戝牊浼呴幒銉︽暪闂堢偞妞傛惔蹇涙Е閸掞拷
    private ExecutorService mExecutorService;              // 缁捐法鈻煎Ч锟�
    private ExecutorService mReadService;              //
    private ExecutorService mWriteService;              //
    private ExecutorService mSequenceService;              //
    private SyncMessageArray mSyncMsgArray;                // 閸氬本顒炲☉鍫熶紖闁氨鐓＄猾锟�
    private Map<Integer, MsgCallbackInfo> mCallBackMap;    // 瀵倹顒炲☉鍫熶紖鎼存梻鐡熼崶鐐剁殶閹恒儱褰沵ap

    private Integer mWaitHeartBeatRspNum;                  // 缁涘绶熻箛鍐儲鎼存梻鐡熼弫鎵窗

    private boolean mIsConnected;                          // 閺勵垰鎯侀崚婵嗩潗閸栨牞绻涢幒銉﹀灇閸旓拷
}
